IOT业务,之前用阿里商用rocketMQ,但设备量上来后,很费钱
so自行搞了一个基于redis list的rightPush+leftPop方案的MQ
IOT网关端无脑rightPush,业务端多线程消费
- 调用方式
@Bean(destroyMethod = "shutdown")
@ConditionalOnProperty(name = "redisPopMq.enable",havingValue = "true")
public RedisPopMqConsumer redisPopMqConsumer(){
Properties properties = new Properties();
properties.put("redisListKey",redisListKey);
properties.put("threadCount",threadCount);
RedisPopMqConsumer consumer = DefaultRedisPopMqConsumer.createConsumer(properties,stringRedisTemplate,redisPopMqListener);
consumer.start();
log.info("RedisPopMqConsumer start");
return consumer;
}
- DefaultRedisPopMqConsumer核心代码如下
public class DefaultRedisPopMqConsumer implements RedisPopMqConsumer {
private volatile boolean isStarted = false;
private RedisPopMqListener listener;
private String redisListKey;
private int threadCount;
private int retryDelaySeconds;
private int maxRetryTime = 3;
private StringRedisTemplate redisTemplate;
private ThreadPoolExecutor consumerExecutor;
private int statisticsIntervalSeconds = 5;
private final DelayQueue<DelayedMsg> delayQueue = new DelayQueue<>();
private static final Logger log = Logger.getLogger(DefaultRedisPopMqConsumer.class.toString());
private DefaultRedisPopMqConsumer() {}
public static RedisPopMqConsumer createConsumer(Properties properties,StringRedisTemplate stringRedisTemplate, RedisPopMqListener listener){
RedisPopMqConsumer consumer = new DefaultRedisPopMqConsumer();
consumer.setProperties(properties);
consumer.setRedisTemplate(stringRedisTemplate);
consumer.setListener(listener);
return consumer;
}
@Override
public boolean isStarted() {
return isStarted;
}
@Override
public boolean isClosed() {
return !isStarted;
}
public void start(){
if(isStarted)
return;
consumerExecutor = new ThreadPoolExecutor(threadCount, threadCount
, 0L, TimeUnit.SECONDS
, new LinkedBlockingQueue<>(threadCount*3)
, new ThreadFactoryBuilder().setNameFormat("redisPopMqConsumerPool-%d").build());
var mainExecutor = Executors.newFixedThreadPool(2 + (statisticsIntervalSeconds>0?1:0)
, new ThreadFactoryBuilder().setNameFormat("redisPopMqMainPool-%d").build());
isStarted = true;
mainExecutor.execute(()->consumerRedisMsg());
mainExecutor.execute(()->consumerRetryDelayMsg());
//一个定时任务线程,实时统计消费情况
if(statisticsIntervalSeconds > 0)
mainExecutor.execute(()->printConsumerStatistics());
log.info("redisPopMq consumer started");
}
//打印消费统计信息
public void printConsumerStatistics() {
if(statisticsIntervalSeconds == 0)
return;
for (; ; ) {
if (!isStarted) {
SleepUtil.sleepMs(1);
continue;
}
var statistics = getConsumerStatistics();
log.info("redisPopMq consumer statistics:" + statistics);
SleepUtil.sleepMs(statisticsIntervalSeconds*1000);
}
}
@Override
public ConsumerStatistics getConsumerStatistics() {
if (!isStarted)
return null;
return new ConsumerStatistics(
consumerExecutor.getActiveCount()
,consumerExecutor.getPoolSize()
,consumerExecutor.getQueue().size()
,consumerExecutor.getCompletedTaskCount()
,new Date());
}
//判断线程池是否已满
private boolean consumerQueueIsFull(){
return consumerExecutor.getQueue().size() >= threadCount;
}
private void consumerRedisMsg() {
//无消息时延迟及线程池满时延迟获取时间,线程越少,最大延迟时间越长
int delayMs = 10+128/threadCount;
//有消息时延迟时间,线程数大于8时为2ms,小于8时为1ms
int sleepMs = threadCount>16?0:threadCount>8?1:2;
for(;;){
if(!isStarted){
SleepUtil.sleepRandomMs(10);
continue;
}
if(consumerQueueIsFull()){
//如果线程池已满,则等待20-35毫秒
SleepUtil.sleepMs(delayMs);
continue;
}
//不延时获取
var msg = redisTemplate.opsForList().leftPop(redisListKey, 0L, TimeUnit.SECONDS);
if(msg != null){
consumerExecutor.execute(() -> {
try {
listener.consume(msg);
} catch (Exception e) {
log.warning("redisPopMq consume msg error,msg:{}" + msg + "," + e.getMessage());
//将msg缓存到延时队列
delayQueue.put(new DelayedMsg(msg, new Date(System.currentTimeMillis() + retryDelaySeconds * 1000L), 0));
}
});
}else {
SleepUtil.sleepMs(delayMs);
continue;
}
SleepUtil.sleepMs(sleepMs);
}
}
public void consumerRetryDelayMsg() {
for(;;){
if(!isStarted){
SleepUtil.sleepRandomMs(10);
continue;
}
DelayedMsg delayedMsg =null;
try {
delayedMsg = delayQueue.take();
}catch (InterruptedException e) {
//ignore
}
if(Objects.nonNull(delayedMsg)){
var msg = delayedMsg.getMsg();
var retryCount = delayedMsg.getRetryCount();
consumerExecutor.execute(() -> {
try {
listener.consume(msg);
} catch (Exception e) {
log.warning("redisPopMq consume delayed msg error,msg:{}" + msg + e.getMessage());
//将msg缓存到延时队列
if(retryCount< maxRetryTime) {
delayQueue.put(new DelayedMsg(msg, new Date(System.currentTimeMillis() + retryDelaySeconds * 1000L), retryCount + 1));
}else {
log.warning("redisPopMq consume delayed msg error > maxRetry time,msg:{}" + msg + e.getMessage());
}
}
});
}
}
}
@Override
public void shutdown() {
if(!isStarted)
return;
isStarted = false;
try {
consumerExecutor.awaitTermination(3, TimeUnit.SECONDS);
log.info("redisPopMq consumer shutdown");
}catch (InterruptedException e){
//ignore
}
}
@Override
public void setProperties(final Properties properties) {
var redisListKey = properties.getProperty("redisListKey");
if(Objects.isNull(redisListKey) || redisListKey.isBlank())
throw new IllegalArgumentException("redisListKey is null");
this.redisListKey = redisListKey;
var threadCountStr = properties.getProperty("threadCount");
this.threadCount = parseNum(threadCountStr,4);
//重试时间
var retryDelaySecondsStr = properties.getProperty("reTryDelaySeconds");
this.retryDelaySeconds = parseNum(retryDelaySecondsStr,10);
//最大重试次数
var retryTimeStr = properties.getProperty("retryTime");
this.maxRetryTime = parseNum(retryTimeStr,3);
var statisticsIntervalSecondsStr = properties.getProperty("statisticsIntervalSeconds");
this.statisticsIntervalSeconds = parseNum(statisticsIntervalSecondsStr,statisticsIntervalSeconds);
}
@Override
public void setListener(final RedisPopMqListener listener) {
this.listener = listener;
}
@Override
public void setRedisTemplate(final StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
private int parseNum(String numStr, int defaultNum){
if(Objects.isNull(numStr) || numStr.isBlank())
return defaultNum;
try{
return Integer.parseInt(numStr);
}catch (Exception e){
return defaultNum;
}
}
}